import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

public class UDF
{



    public static void main(String[] args) throws Exception
    {



        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

        // register the function
        tableEnv.registerFunction("hashCode", new HashCode(10));

        DataSet<String> orderA = env.fromCollection(Arrays.asList("beer","diaper","rubber"));

        Table tableA = tableEnv.fromDataSet(orderA,"content");

//－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－
//    第１种用法
        Table result1 = tableA.select("content, content.hashCode(), hashCode(content)");
        tableEnv.toDataSet(result1, Row.class).print();

//－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－
//      第２种用法

//        tableEnv.registerTable("tableA",tableA);
        Table result2 = tableEnv.sqlQuery("SELECT content,HASHCODE(content) FROM "+tableA);
        tableEnv.toDataSet(result2,Row.class).print();

    }

}
